package com.wang.dmp.graphx

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 借助spark graphx 求共同好友
 */
object CommonFriends {

  //屏蔽日志
  Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("共同好友推荐")
      .setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //序列化
    val sc = new SparkContext(conf)

    //1.构建点集合和边集合
    val vertexRDD: RDD[(VertexId, (String, Int))] = sc.makeRDD(Seq(
      (1L, ("王茵茵", 18)),
      (2L, ("雯雯文", 20)),
      (6L, ("章宇哥", 23)),
      (9L, ("赵丽颖", 30)),
      (133L, ("王一宁", 23)),

      (138L, ("超哥", 22)),
      (16L, ("星星", 24)),
      (21L, ("亮亮", 35)),
      (44L, ("峰峰", 40)),

      (5L, ("海哥", 30)),
      (7L, ("果果", 31)),
      (158L, ("老王", 38))
    ))
    //边集合
    val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(Seq(
      Edge(1, 133, 0),
      Edge(2, 133, 0),
      Edge(6, 133, 0),
      Edge(9, 133, 0),

      Edge(6, 138, 0),
      Edge(16, 138, 0),
      Edge(21, 138, 0),
      Edge(44, 138, 0),
      Edge(1, 138, 0),

      Edge(5, 158, 0),
      Edge(7, 158, 0)
    ))

    //2.构建图对象
    val graph = Graph(vertexRDD, edgeRDD)

    //3.调用图相关算法
    //graph.connectedComponents().vertices.foreach(println)
    val cc = graph.connectedComponents().vertices

    //4.将元祖的数据进行交换  (1,16,44,21,133,1,9,138,6,2) (5,5,158,7)
    //cc.map(tp => (tp._2,tp._1.toString)).reduceByKey((a,b) => a.concat(",").concat(b)).foreach(println)

    cc.join(vertexRDD).map {
      case (userId, (commonMinId, (name, age))) => (commonMinId, name)
    }.reduceByKey((a, b) => a + "," + b)
      .foreach(println)


    //关闭
    sc.stop()
  }
}
